[SPARK-29435][Core]MapId in Shuffle Block is inconsistent at the writer and reader part when spark.shuffle.useOldFetchProtocol=true#26095
Conversation
|
cc @cloud-fan @xuanyuanking Please take a look at the fix, if it is okay I will add UT and refactor the code |
| for (part <- startPartition until endPartition) { | ||
| val size = status.getSizeForBlock(part) | ||
| if (size != 0) { | ||
| if (useOldFetchProtocol) { |
There was a problem hiding this comment.
Thanks for the report and fix!
The root cause is while we set useOldFetchProtocol=true here, the shuffle id in the reader side and the writer side are inconsistent.
But we can't fix like this, because while useOldFetchProtocl=false, we'll use the old version of fetching protocol OpenBlocks, which consider map id is Integer and will directly parse the string. So for the big and long-running application, it will still not work. See the code:
So the right way I think is doing the fix in ShuffleWriteProcessor, we should fill mapId with mapTaskId or mapIndex denpending on config spark.shuffle.useOldFetchProtocol.
There was a problem hiding this comment.
sorry, could you explain why Integer directly parse the string for the big and long-running application not work?
Is it a performance problem?
looking forward for your reply.
| blockManager, | ||
| mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition, | ||
| SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)), | ||
| mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), |
There was a problem hiding this comment.
This is the shuffle read side and we need to know the value of SHUFFLE_USE_OLD_FETCH_PROTOCOL. I think the bug is in the shuffle write side which is fixed in this PR. Do we really need to change the shuffle read side?
There was a problem hiding this comment.
This is redundant code, since ShuffleWrite writes the mapId based on the spark.shuffle.useOldFetchProtocol flag, MapStatus.mapTaskId always gives the mapId which is set by the ShuffleWriter
| ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) | ||
| } | ||
| splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += | ||
| ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) |
There was a problem hiding this comment.
here we always pick status.mapTaskId as mapId, is this corrected?
There was a problem hiding this comment.
OK I get it now. We should rename MapStatus.mapTaskId to mapId.
|
ok to test |
|
Please also change the PR description, the detailed log and code can be skipped, even the code linked with master will always in changing. |
|
Test build #112010 has finished for PR 26095 at commit
|
|
@sandeep-katta In case of the consistency issue, we'd better add a UT for the old fetch protocol config. I gave a PR to your branch, please have a review: sandeep-katta#3. |
|
Test build #112014 has finished for PR 26095 at commit
|
|
Test build #112028 has finished for PR 26095 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
Shuffle Block Construction during Shuffle Write and Read is wrong
Shuffle Map Task (Shuffle Write)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 3] org.apache.spark.shuffle.IndexShuffleBlockResolver: ####### For Debug ############ /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/0d/shuffle_0_3_0.index
Result Task (Shuffle Read)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 6] org.apache.spark.storage.ShuffleBlockFetcherIterator: Error occurred while fetching local blocks
java.nio.file.NoSuchFileException: /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/30/shuffle_0_0_0.index
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
As per SPARK-25341
mapIdofSortShuffleManager.getWriterchanged tocontext.taskAttemptId()frompartitionIdcode
But
MapOutputTracker.convertMapStatusesreturns the wrong ShuffleBlock, ifspark.shuffle.useOldFetchProtocolenabled, it returnsparitionIdasmapIDwhich is wrong . CodeWhy are the changes needed?
Already MapStatus returned by the ShuffleWriter has the mapId for e.g. code here. So it's nice to use
status.mapTaskIdDoes this PR introduce any user-facing change?
No
How was this patch tested?
Existing UT and manually tested with
spark.shuffle.useOldFetchProtocolas true and false